In [ ]:
import pandas as pd
import mmlspark
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
In [ ]:
dataFile = "BookReviewsFromAmazon10K.tsv"
textSchema = StructType([StructField("rating", IntegerType(), False),
StructField("text", StringType(), False)])
import os, urllib
if not os.path.isfile(dataFile):
urllib.request.urlretrieve("https://mmlspark.azureedge.net/datasets/"+dataFile, dataFile)
data = spark.createDataFrame(pd.read_csv(dataFile, sep="\t", header=None), textSchema)
data.limit(10).toPandas()
Use TextFeaturizer to generate our features column. We remove stop words, and use TF-IDF
to generate 2²⁰ sparse features.
In [ ]:
from mmlspark.TextFeaturizer import TextFeaturizer
textFeaturizer = TextFeaturizer() \
.setInputCol("text").setOutputCol("features") \
.setUseStopWordsRemover(True).setUseIDF(True).setMinDocFreq(5).setNumFeatures(1 << 16).fit(data)
In [ ]:
processedData = textFeaturizer.transform(data)
processedData.limit(5).toPandas()
Change the label so that we can predict whether the rating is greater than 3 using a binary classifier.
In [ ]:
processedData = processedData.withColumn("label", processedData["rating"] > 3) \
.select(["features", "label"])
processedData.limit(5).toPandas()
Train several Logistic Regression models with different regularizations.
In [ ]:
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])
from pyspark.ml.classification import LogisticRegression
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [LogisticRegression(regParam = hyperParam) for hyperParam in lrHyperParams]
from mmlspark.TrainClassifier import TrainClassifier
lrmodels = [TrainClassifier(model=lrm, labelCol="label").fit(train) for lrm in logisticRegressions]
Find the model with the best AUC on the test set.
In [ ]:
from mmlspark import FindBestModel, BestModel
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
bestModel.write().overwrite().save("model.mml")
loadedBestModel = BestModel.load("model.mml")
Use the optimized ComputeModelStatistics API to find the model accuracy.
In [ ]:
from mmlspark.ComputeModelStatistics import ComputeModelStatistics
predictions = loadedBestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print("Best model's accuracy on validation set = "
+ "{0:.2f}%".format(metrics.first()["accuracy"] * 100))